今天是鐵人賽的最後一天,最後一個主題想介紹 Go 也可以運用在 Hadoop 的 HDFS 上.所以 Go 可以運用在很多地方.
簡單稍微介紹一下,HDFS(Hadoop Distributed File System) 是一種分散式的檔案系統.使用上跟一般的檔案系統差不多,只不過是要透過 hdfs 的指令去執行.而 Redis 是一種 in memory 的 key-value 資料庫,速度很快,而且資料量太大可以使用多台 Redis 組成 Cluster 的模式.
使用 hdfs dfs -ls 看 /user/miuser/data/model/20181030_person 底下的檔案
> hdfs dfs -ls /user/miuser/data/model/20181030_person
18/10/30 12:50:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 6 items
-rw-r--r-- 2 miuser supergroup 0 2018-10-30 12:44 /user/miuser/data/model/20181030_person/_SUCCESS
-rw-r--r-- 2 miuser supergroup 185979860 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00000
-rw-r--r-- 2 miuser supergroup 185978526 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00001
-rw-r--r-- 2 miuser supergroup 185973527 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00002
-rw-r--r-- 2 miuser supergroup 185977162 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00003
-rw-r--r-- 2 miuser supergroup 185986465 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00004
使用 hdfs dfs -cat part-00004 這檔案的內容
> hdfs dfs -cat /user/miuser/data/model/20181030_person/part-00004 | head -n 1
18/10/30 12:52:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0,0,hash:trackrapid68427621,0|||||hash:trackrapid68427621|10071,9;10058,9;10080,9;10328,9
目標就是利用 Go 把上面的 HDFS 當案 parser 後寫入 Redis Cluster.
HDFS 部分參考 HDFS for Go,API 使用可以參考 GoDoc,下載
go get github.com/colinmarc/hdfs
Redis Cluster 部分參考 go-redis.下載
go get github.com/go-redis/redis
主程式
package main
import (
"bufio"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/colinmarc/hdfs"
"github.com/go-redis/redis"
)
/*
HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.1:8020 ./enrich-backupToRedis
hdfsDirPath := "/user/miuser/data/model/20181024_person"
hdfsClient, _ := hdfs.New("192.168.3.1:8020")
*/
func main() {
start := time.Now()
fmt.Println("HDFS to Redis start Time : " + start.Format("20060102150405"))
var client *redis.ClusterClient
hdfsDirPath := os.Getenv("HDFS_DIR")
hdfsNameNode := os.Getenv("HDFS_NAMENODE")
// 給 HDFS 的 Namenode ip
hdfsClient, _ := hdfs.New(hdfsNameNode)
//讀取 HDFS 的目錄
hdfsFiles, _ := hdfsClient.ReadDir(hdfsDirPath)
//取得 redis cluster 的 client
client = redis.NewClusterClient(&redis.ClusterOptions{
// 給 redis cluster 的 ip (這邊用了 3 台 redis)
Addrs: []string{"192.168.3.1:7000", "192.168.3.2:7000", "192.168.3.3:7000"},
Password: "test",
})
// 測試看看Redis 連線是否正常
if !pingRedisCluster(client) {
fmt.Println("Connect RedisCluster Fail ! ")
os.Exit(1)
}
var exeCount = 0
var m sync.Mutex
var wg sync.WaitGroup
wg.Add(len(hdfsFiles))
for _, hdfsFile := range hdfsFiles {
var hdfsnm string
hdfsnm = hdfsFile.Name()
go func() {
defer wg.Done()
// 檔名為 part 開頭
if strings.HasPrefix(hdfsnm, "part") {
hdfsFile := hdfsDirPath + "/" + hdfsnm
//取得 HDFS 的 File
file, _ := hdfsClient.Open(hdfsFile)
//一行一行讀取
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lineData := scanner.Text()
//將每一行資料寫入 Redis cluster
personModelToRedis(client, lineData)
m.Lock()
exeCount++
m.Unlock()
}
}
}()
}
wg.Wait()
end := time.Now()
executeTime := end.Sub(start)
fmt.Println("HDFS to Redis end Time : " + end.Format("20060102150405"))
fmt.Printf("HDFS to Redis executeTime : %v , executeCount : %d ", executeTime, exeCount)
}
func personModelToRedis(client *redis.ClusterClient, psersonStr string) {
//HDFS : 0,0;1;2(certset),id,last | locations | urls | kvs | profiles | ids | labels
personModel := make(map[string]interface{})
modelPart := strings.Split(psersonStr, "|")
personPart := strings.Split(modelPart[0], ",")
personID := personPart[2]
personModel["certset"] = personPart[1]
personModel["id"] = personID
personModel["last"] = personPart[3]
personModel["locations"] = modelPart[1]
personModel["urls"] = modelPart[2]
personModel["kvs"] = modelPart[3]
personModel["profiles"] = modelPart[4]
personModel["ids"] = modelPart[5]
personModel["labels"] = modelPart[6]
client.HMSet("usrProfile_"+personID, personModel)
}
func pingRedisCluster(client *redis.ClusterClient) bool {
err := client.Ping().Err()
if err == nil {
return true
} else {
return false
}
}
打包成可在 linux 上執行的執行檔,需要再 go build 前面加上參數
env GOOS=linux GOARCH=amd64 go build example1.go
給環境變數的執行方式
HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.65:8020 ./enrich-backupToRedis
使用 os.Getenv 可以在執行程式時,抓到環境變數的值.
最後整理一下這 30 天的一些主題,主要分成 go 語言的基礎與進階,接下來就是 go 的應用,每一項主題都還可以花時間再研究的更仔細.但 go 最大的優點應該是使用 goroutine 的平行處理,只要用得好 goroutine 對程式的效能來說會提升不少.
1.variables
2.pointer
3.new
4.type
5.Array & slice
6.map & struct
7.function
8.methods
9.interface
10.flow control
11.select case
1.goroutine
2.channel
3.sync.WaitGroup & sync.Mutex
4.io
5.json
6.testing
7.benchmark
8.error handling
9.reflection
1.web server
2.gin-gonic
3.mongodb
4.HDFS to Redis
...
感謝~~~~~~~